{
 "cells": [
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Classification - SparkML vs SynapseML"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "tags": [
     "hide-synapse-internal"
    ]
   },
   "source": [
    "<p><img src=\"https://images-na.ssl-images-amazon.com/images/G/01/img16/books/bookstore/landing-page/1000638_books_landing-page_bookstore-photo-01.jpg\" style=\"width: 500px;\" title=\"Image from https://images-na.ssl-images-amazon.com/images/G/01/img16/books/bookstore/landing-page/1000638_books_landing-page_bookstore-photo-01.jpg\" /><br /></p>"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In this article, you perform the same classification task in two\n",
    "different ways: once using plain **`pyspark`** and once using the\n",
    "**`synapseml`** library.  The two methods yield the same performance,\n",
    "but highlights the simplicity of using `synapseml` compared to `pyspark`.\n",
    "\n",
    "The task is to predict whether a customer's review of a book sold on\n",
    "Amazon is good (rating > 3) or bad based on the text of the review. You\n",
    "accomplish it by training LogisticRegression learners with different\n",
    "hyperparameters and choosing the best model."
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Setup\n",
    "Import necessary Python libraries and get a spark session."
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Read the data\n",
    "\n",
    "Download and read in the data."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "rawData = spark.read.parquet(\n",
    "    \"wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet\"\n",
    ")\n",
    "rawData.show(5)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Extract features and process data\n",
    "\n",
    "Real data is more complex than the above dataset. It's common\n",
    "for a dataset to have features of multiple types, such as text, numeric, and\n",
    "categorical. To illustrate how difficult it's to work with these\n",
    "datasets, add two numerical features to the dataset: the **word count** of the review and the **mean word length**."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.functions import udf\n",
    "from pyspark.sql.types import *\n",
    "\n",
    "\n",
    "def wordCount(s):\n",
    "    return len(s.split())\n",
    "\n",
    "\n",
    "def wordLength(s):\n",
    "    import numpy as np\n",
    "\n",
    "    ss = [len(w) for w in s.split()]\n",
    "    return round(float(np.mean(ss)), 2)\n",
    "\n",
    "\n",
    "wordLengthUDF = udf(wordLength, DoubleType())\n",
    "wordCountUDF = udf(wordCount, IntegerType())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from synapse.ml.stages import UDFTransformer\n",
    "\n",
    "wordLength = \"wordLength\"\n",
    "wordCount = \"wordCount\"\n",
    "wordLengthTransformer = UDFTransformer(\n",
    "    inputCol=\"text\", outputCol=wordLength, udf=wordLengthUDF\n",
    ")\n",
    "wordCountTransformer = UDFTransformer(\n",
    "    inputCol=\"text\", outputCol=wordCount, udf=wordCountUDF\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml import Pipeline\n",
    "\n",
    "data = (\n",
    "    Pipeline(stages=[wordLengthTransformer, wordCountTransformer])\n",
    "    .fit(rawData)\n",
    "    .transform(rawData)\n",
    "    .withColumn(\"label\", rawData[\"rating\"] > 3)\n",
    "    .drop(\"rating\")\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "data.show(5)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Classify using pyspark\n",
    "\n",
    "To choose the best LogisticRegression classifier using the `pyspark`\n",
    "library, we need to *explicitly* perform the following steps:\n",
    "\n",
    "1. Process the features:\n",
    "   - Tokenize the text column\n",
    "   - Hash the tokenized column into a vector using hashing\n",
    "   - Merge the numeric features with the vector\n",
    "2. Process the label column: cast it into the proper type.\n",
    "3. Train multiple LogisticRegression algorithms on the `train` dataset\n",
    "   with different hyperparameters\n",
    "4. Compute the area under the ROC curve for each of the trained models\n",
    "   and select the model with the highest metric as computed on the\n",
    "   `test` dataset\n",
    "5. Evaluate the best model on the `validation` set"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.feature import Tokenizer, HashingTF\n",
    "from pyspark.ml.feature import VectorAssembler\n",
    "\n",
    "# Featurize text column\n",
    "tokenizer = Tokenizer(inputCol=\"text\", outputCol=\"tokenizedText\")\n",
    "numFeatures = 10000\n",
    "hashingScheme = HashingTF(\n",
    "    inputCol=\"tokenizedText\", outputCol=\"TextFeatures\", numFeatures=numFeatures\n",
    ")\n",
    "tokenizedData = tokenizer.transform(data)\n",
    "featurizedData = hashingScheme.transform(tokenizedData)\n",
    "\n",
    "# Merge text and numeric features in one feature column\n",
    "featureColumnsArray = [\"TextFeatures\", \"wordCount\", \"wordLength\"]\n",
    "assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol=\"features\")\n",
    "assembledData = assembler.transform(featurizedData)\n",
    "\n",
    "# Select only columns of interest\n",
    "# Convert rating column from boolean to int\n",
    "processedData = assembledData.select(\"label\", \"features\").withColumn(\n",
    "    \"label\", assembledData.label.cast(IntegerType())\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.evaluation import BinaryClassificationEvaluator\n",
    "from pyspark.ml.classification import LogisticRegression\n",
    "\n",
    "# Prepare data for learning\n",
    "train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)\n",
    "\n",
    "# Train the models on the 'train' data\n",
    "lrHyperParams = [0.05, 0.1, 0.2, 0.4]\n",
    "logisticRegressions = [\n",
    "    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams\n",
    "]\n",
    "evaluator = BinaryClassificationEvaluator(\n",
    "    rawPredictionCol=\"rawPrediction\", metricName=\"areaUnderROC\"\n",
    ")\n",
    "metrics = []\n",
    "models = []\n",
    "\n",
    "# Select the best model\n",
    "for learner in logisticRegressions:\n",
    "    model = learner.fit(train)\n",
    "    models.append(model)\n",
    "    scoredData = model.transform(test)\n",
    "    metrics.append(evaluator.evaluate(scoredData))\n",
    "bestMetric = max(metrics)\n",
    "bestModel = models[metrics.index(bestMetric)]\n",
    "\n",
    "# Get AUC on the validation dataset\n",
    "scoredVal = bestModel.transform(validation)\n",
    "print(evaluator.evaluate(scoredVal))"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Classify using SynapseML\n",
    "\n",
    "The steps needed with `synapseml` are simpler:\n",
    "\n",
    "1. The **`TrainClassifier`** Estimator featurizes the data internally,\n",
    "   as long as the columns selected in the `train`, `test`, `validation`\n",
    "   dataset represent the features\n",
    "\n",
    "2. The **`FindBestModel`** Estimator finds the best model from a pool of\n",
    "   trained models by finding the model that performs best on the `test`\n",
    "   dataset given the specified metric\n",
    "\n",
    "3. The **`ComputeModelStatistics`** Transformer computes the different\n",
    "   metrics on a scored dataset (in our case, the `validation` dataset)\n",
    "   at the same time"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from synapse.ml.train import TrainClassifier, ComputeModelStatistics\n",
    "from synapse.ml.automl import FindBestModel\n",
    "\n",
    "# Prepare data for learning\n",
    "train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)\n",
    "\n",
    "# Train the models on the 'train' data\n",
    "lrHyperParams = [0.05, 0.1, 0.2, 0.4]\n",
    "logisticRegressions = [\n",
    "    LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams\n",
    "]\n",
    "lrmodels = [\n",
    "    TrainClassifier(model=lrm, labelCol=\"label\", numFeatures=10000).fit(train)\n",
    "    for lrm in logisticRegressions\n",
    "]\n",
    "\n",
    "# Select the best model\n",
    "bestModel = FindBestModel(evaluationMetric=\"AUC\", models=lrmodels).fit(test)\n",
    "\n",
    "\n",
    "# Get AUC on the validation dataset\n",
    "predictions = bestModel.transform(validation)\n",
    "metrics = ComputeModelStatistics().transform(predictions)\n",
    "print(\n",
    "    \"Best model's AUC on validation set = \"\n",
    "    + \"{0:.2f}%\".format(metrics.first()[\"AUC\"] * 100)\n",
    ")"
   ]
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "kernelspec": {
   "display_name": "Python [default]",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
